package cn.quantgroup.rabbitmq.workqueue;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * Created by mingzhu on 2016/12/12.
 */
public class Worker {
    private static final String QUEUE_NAME = "worker_queue";

    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("admin");
        factory.setPassword("admin");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        boolean durable = false;
        //声明队列,消息接受者再次声明队列.确保消息队列存在,队列的创建是幂等的，存在的队列不会再次创建
        channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        //使用信道创建消息消费者
        final Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
                try {
                    doWork(message);
                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    System.out.println(" [x] Done");
                }
            }
        };
        //设置自动确认为true.此处的自动确认会导致丢失消息.比如消费者未处理完成而被杀死,链接断开等等.
        boolean autoAck = true;
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }

    /**
     * 判断消息中.的数量，每个.休眠一秒，模拟耗时的任务
     * @param task
     * @throws InterruptedException
     */
    public static void doWork(String task) throws InterruptedException{
        for(char ch : task.toCharArray()){
            if(ch == '.'){
                Thread.sleep(1000);
            }
        }
    }
}